# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
from abc import ABCMeta, abstractmethod
from hysop.constants import __VERBOSE__, __DEBUG__
from hysop.backend import __HAS_OPENCL_BACKEND__, __HAS_CUDA_BACKEND__
from hysop.tools.units import bytes2str, time2str
from hysop.tools.htypes import check_instance
from hysop.tools.contexts import Timer
from hysop.core.memory.mem_utils import memory_repport, virtual_memory
from hysop.core.memory.buffer import Buffer, PooledBuffer
from hysop.core.memory.allocator import AllocatorBase
if __HAS_OPENCL_BACKEND__:
from pyopencl.tools import bitlog2
elif __HAS_CUDA_BACKEND__:
from pycuda.tools import bitlog2
else:
def bitlog2(x):
assert x > 0
p = 0
while x:
p += 1
x >>= 1
return p - 1
[docs]
class MemoryPool(metaclass=ABCMeta):
"""
pyopencl/pycuda like memory pool extended to be compatible for all backends.
"""
def __new__(
cls,
name,
allocator,
max_alloc_bytes=None,
mantissa_bits=4,
verbose=None,
**kwds,
):
return super().__new__(cls, **kwds)
def __init__(
self,
name,
allocator,
max_alloc_bytes=None,
mantissa_bits=4,
verbose=None,
**kwds,
):
"""
Builds a MemoryPool from an allocator.
Provides an allocator like interface.
Parameters
----------
name: str
the name of this allocator for logging purposes
allocator: hysop.core.memory.AllocatorBase
allocator used by this memory pool, must be an immediate allocator.
verbose: bool
turn on or off allocator messages (defaults to hysop verbosity configuration)
max_alloc_bytes: int
maximum number of bytes this pool will try to allocate before raising a MemoryError.
default value is:
-80% of physical host memory if allocator is a HostAllocator.
-None (no limit) if allocator is a DeviceAllocator
mantissa_bits: int
subdivisions bits of power of two allocations.
higher values means more bins (less memory waste) but less buffer reuse.
Notes
-----
An allocator that fails to allocate memory should raise a MemoryError
to expect the pool to work correctly.
Some allocators may fail to raise a MemoryError when there is no more memory
left and will just trigger a SIGKILL from operating system or deadlock instead.
To avoid such situations, put an artificial software allocation imit trough the
max_alloc_bytes parameter. Exceeding this allocation limit will throw a proper
MemoryError.
Examples:
*Host allocation on ubuntu 16.04 => SIGKILL
*OpenCl AMD mesa open source driver on ubuntu 16.04 => deadlock
*OpenCl Nvidia 375.20 driver on ubuntu 16.04 => work as expected
"""
super().__init__(**kwds)
check_instance(allocator, AllocatorBase)
default_limit = (
int(0.80 * virtual_memory().total) if allocator.is_on_host() else None
)
max_alloc_bytes = max_alloc_bytes or default_limit
verbose = verbose if isinstance(verbose, bool) else __DEBUG__
check_instance(name, str)
check_instance(mantissa_bits, int)
check_instance(max_alloc_bytes, int, allow_none=True)
check_instance(verbose, bool)
self.name = name.strip()
self.allocator = allocator
self.verbose = verbose or True
self.bin_nr_to_bin = {}
self.alloc_statistics = {}
self.mantissa_bits = mantissa_bits
self.mantissa_mask = (1 << mantissa_bits) - 1
self.allocated_bytes = 0
self.max_alloc_bytes = max_alloc_bytes
if self.allocator.is_deferred:
msg = (
"Memory pools expect non-deferred "
"semantics from their allocators. You passed a deferred "
"allocator, i.e. an allocator whose allocations can turn out to "
"be unavailable long after allocation."
)
raise RuntimeError(msg)
self.active_blocks = 0
self.stop_holding_flag = False
@abstractmethod
def _wrap_buffer(self, buf, alloc_sz, size, alignment):
"""
Wrap allocated buffer into a PooledBuffer.
"""
pass
[docs]
def may_alloc(self, size):
"""
Return true if this pool may allocate a buffer of given size (in bytes).
Only logical bound is checked (max_alloc_bytes).
"""
alloc_sz = self.alloc_size(self.bin_number(size))
return self._may_alloc(alloc_sz)
def _may_alloc(self, alloc_sz):
if self.max_alloc_bytes is None:
return True
else:
return self.allocated_bytes + alloc_sz <= self.max_alloc_bytes
[docs]
def bin_number(self, size):
"""
Returns the bin number in witch a buffer of given size would be put.
"""
l = bitlog2(size)
mantissa_bits = self.mantissa_bits
if l >= mantissa_bits:
shifted = size >> (l - mantissa_bits)
else:
shifted = size << (mantissa_bits - l)
assert not (size and (shifted & (1 << mantissa_bits)) == 0)
chopped = shifted & self.mantissa_mask
return l << mantissa_bits | chopped
[docs]
def alloc_size(self, bin_nr):
"""
Compute real allocation size given an bin number.
Note that alloc_size(bin_number(x)) is always >= x.
"""
mantissa_bits = self.mantissa_bits
exponent = bin_nr >> mantissa_bits
mantissa = bin_nr & self.mantissa_mask
exp_minus_mbits = exponent - mantissa_bits
if exp_minus_mbits >= 0:
ones = (1 << exp_minus_mbits) - 1
head = ((1 << mantissa_bits) | mantissa) << exp_minus_mbits
else:
ones = 0
head = ((1 << mantissa_bits) | mantissa) >> -exp_minus_mbits
assert not (ones & head)
return head | ones
[docs]
def stop_holding(self):
"""
Tells the pool to stop holding back freed buffer
and direclty free all unused buffers.
"""
self.stop_holding_flag = True
self.free_held()
[docs]
def free_held(self):
"""
Free all unused held buffers but does not set
the stop_holding_flag flag.
"""
for _ in self._try_to_free_memory():
pass
@property
def held_blocks(self):
"""
Returns the number of held blocks.
"""
return sum(len(bin_list) for bin_list in self.bin_nr_to_bin.values())
[docs]
def allocate_aligned(self, size, alignment):
"""
Same as allocate for a memory pool.
"""
return self.allocate(nbytes=size, alignment=alignment)
[docs]
def allocate(self, nbytes, alignment=None):
"""
Allocate a buffer of size nbytes and given alignment.
The real size of the allocated buffer may be greater and wasted memory mostly
depend on configured mantissa_bits (default is 2).
The returned buffer is an instance of hysop.core.memory.buffer.PooledBuffer.
While a reference to the returned object is kept, it won't return to the pool.
"""
alignment = alignment or 1
assert alignment > 0
assert not (alignment & alignment - 1), "alignment is not a power of 2."
# we may need more memory to align returned ptr
min_alloc_size = nbytes + alignment - 1
# maybe bin allocation size will be sufficient
bin_nr = self.bin_number(nbytes)
alloc_sz = self.alloc_size(bin_nr)
# else we choose a bin that can provide min_alloc_size bytes
if alloc_sz < min_alloc_size:
bin_nr = self.bin_number(min_alloc_size)
alloc_sz = self.alloc_size(bin_nr)
bin_list = self.bin_nr_to_bin.setdefault(bin_nr, [])
assert self.bin_number(alloc_sz) == bin_nr
size = nbytes
stat_nr = bitlog2(size)
statistic = self.alloc_statistics.setdefault(
stat_nr, PoolAllocationStatistics()
)
verbose = self.verbose
if bin_list:
if verbose:
msg = "{} allocation request of size {} served from bin {}."
msg = msg.format(self.header(), bytes2str(size, decimal=False), bin_nr)
print(msg)
self.active_blocks += 1
statistic.push_reuse(alloc_sz)
return self._wrap_buffer(bin_list.pop(), alloc_sz, size, alignment)
if self._may_alloc(alloc_sz):
try:
with Timer() as t:
result = self.allocator(alloc_sz)
self.active_blocks += 1
self.allocated_bytes += alloc_sz
statistic.push_alloc(alloc_sz, t.interval)
if verbose:
msg = "{} allocated new block of size {} to serve a {} request."
msg = msg.format(
self.header(),
bytes2str(alloc_sz, decimal=False),
bytes2str(size, decimal=False),
)
print(msg)
return self._wrap_buffer(result, alloc_sz, size, alignment)
except MemoryError as e:
if verbose:
msg = "{} allocation of size {} failed, freeing unused blocks."
msg = msg.format(self.header(), bytes2str(alloc_sz, decimal=False))
print(msg)
else:
prefix = " " * len(self.header())
allocated_bytes = self.allocated_bytes
max_alloc_bytes = self.max_alloc_bytes
available = max_alloc_bytes - allocated_bytes
msg = "{} allocating {} would exceed pool max allocation limits:"
msg += "\n{p} *current {}"
msg += "\n{p} *max {}"
msg += "\n{p} *available {}"
msg += "\n{p} => trying to free unused blocks before allocation."
msg = msg.format(
self.header(),
bytes2str(alloc_sz, decimal=False),
bytes2str(allocated_bytes, decimal=False),
bytes2str(max_alloc_bytes, decimal=False),
bytes2str(available, decimal=False),
p=prefix,
)
print(msg)
freed_bytes = 0
try_last_alloc = False
for fb in self._try_to_free_memory():
may_alloc = self._may_alloc(alloc_sz)
if fb is None:
# all unused block were freed, last chance to allocate
if may_alloc:
try_last_alloc = True
else:
prefix = " " * len(self.header())
allocated_bytes = self.allocated_bytes
max_alloc_bytes = self.max_alloc_bytes
available = max_alloc_bytes - allocated_bytes
msg = "{} all unused blocks freed but allocation would "
msg += "still exceed pool limits:"
msg += "\n{p} *current {}"
msg += "\n{p} *max {}"
msg += "\n{p} *available {}"
msg = msg.format(
self.header(),
bytes2str(allocated_bytes, decimal=False),
bytes2str(max_alloc_bytes, decimal=False),
bytes2str(available, decimal=False),
p=prefix,
)
print(msg)
else:
freed_bytes += fb
if ((freed_bytes >= alloc_sz) and may_alloc) or try_last_alloc:
try:
with Timer() as t:
result = self.allocator(alloc_sz)
self.active_blocks += 1
self.allocated_bytes += alloc_sz
statistic.push_alloc(alloc_sz, t.interval)
if verbose:
msg = "{} allocation succeded after block destruction."
msg = msg.format(self.header())
print(msg)
return (self, result, alloc_sz, size, alignment)
except MemoryError:
pass
msg = "{} no more free blocks left, allocation failed."
msg = msg.format(self.header())
print(msg)
print()
print(memory_repport())
if statistic.nallocs == 0:
self.alloc_statistics.pop(stat_nr)
self.print_allocation_report()
raise MemoryError(msg)
__call__ = allocate
[docs]
def free(self, buf, size):
bin_nr = self.bin_number(size)
stat_nr = bitlog2(size)
statistics = self.alloc_statistics[stat_nr]
self.active_blocks -= 1
if not self.stop_holding_flag:
self.bin_nr_to_bin.setdefault(bin_nr, []).append(buf)
if self.verbose:
msg = "{} block of size {} returned to bin {}." # wich now contains {} entries.'
msg = msg.format(self.header(), bytes2str(size, decimal=False), bin_nr)
# len(self.bin_nr_to_bin[bin_nr]))
print(msg)
statistics.push_return(size)
else:
if self.verbose:
msg = "{} freeing block of size {} in bin {}."
msg = msg.format(self.header(), bytes2str(size), bin_nr)
print(msg)
with Timer() as t:
self.allocator.free(buf)
self.allocated_bytes -= size
statistics.push_free(size, t.interval)
def _try_to_free_memory(self):
for bin_nr, bin_list in self.bin_nr_to_bin.items():
if not bin_list:
continue
size = bin_list[0].size
stat_nr = bitlog2(size)
statistics = self.alloc_statistics[stat_nr]
while bin_list:
block = bin_list.pop()
if self.verbose:
msg = "{} freeing block of size {}."
msg = msg.format(self.header(), bytes2str(size, decimal=False))
print(msg)
with Timer() as t:
self.allocator.free(block)
self.allocated_bytes -= size
statistics.push_free(size, t.interval)
yield size
yield None
[docs]
def allocation_report(self):
"""
Returns various statistics of this pool as a string.
"""
stats = self.alloc_statistics.values()
nrequests = sum(v.nrequests for v in stats)
nallocs = sum(v.nallocs for v in stats)
nreuses = sum(v.nreuses for v in stats)
nreturns = sum(v.nreturns for v in stats)
nfrees = sum(v.nfrees for v in stats)
ballocs = sum(v.ballocs for v in stats)
breuses = sum(v.breuses for v in stats)
bfrees = sum(v.bfrees for v in stats)
breturns = sum(v.breturns for v in stats)
active_blocks = self.active_blocks
held_blocks = self.held_blocks
allocated_bytes = self.allocated_bytes
assert active_blocks + held_blocks == nallocs - nfrees
assert allocated_bytes == ballocs - bfrees
assert active_blocks == nallocs + nreuses - nreturns
assert held_blocks == nreturns - nreuses - nfrees
assert nrequests == nallocs + nreuses
width = 0
for n in [active_blocks, held_blocks, nrequests, nallocs, nreuses, nfrees]:
if n == 0:
continue
width = max(width, int(math.ceil(math.log10(n))))
ss = f"== Memory pool {self.name} allocation report =="
ss += "\n Global pool statistics:"
ss += "\n {:>{width}} blocks active {} ({})".format(
active_blocks,
bytes2str(ballocs + breuses - breturns, decimal=False),
bytes2str(ballocs + breuses - breturns),
width=width,
)
ss += "\n {:>{width}} blocks held {} ({})".format(
held_blocks,
bytes2str(breturns - breuses - bfrees, decimal=False),
bytes2str(breturns - breuses - bfrees),
width=width,
)
ss += "\n"
ss += "\n {:>{width}} blocks requested {} ({})".format(
nrequests,
bytes2str(ballocs + breuses, decimal=False),
bytes2str(ballocs + breuses),
width=width,
)
ss += "\n {:>{width}} blocks reused {} ({})".format(
nreuses, bytes2str(breuses, decimal=False), bytes2str(breuses), width=width
)
ss += "\n {:>{width}} blocks allocated {} ({})".format(
nallocs, bytes2str(ballocs, decimal=False), bytes2str(ballocs), width=width
)
ss += "\n {:>{width}} blocks freed {} ({})".format(
nfrees, bytes2str(bfrees, decimal=False), bytes2str(bfrees), width=width
)
ss += "\n"
ss += "\n Detailed pool statistics:"
has_stats = False
for stat_nr, stat in self.alloc_statistics.items():
has_stats = True
ss += "\n {:>10} <{} x <= {:<10} => {}".format(
bytes2str(2 ** (stat_nr), decimal=False),
"=" if stat_nr == 0 else " ",
bytes2str(2 ** (stat_nr + 1), decimal=False),
stat,
)
if not has_stats:
ss += "\n *no allocated blocks*"
ss += "\n\n Held blocks:"
has_block = False
for bin_nr in sorted(self.bin_nr_to_bin.keys()):
blocks = self.bin_nr_to_bin[bin_nr]
nblocks = len(blocks)
if nblocks == 0:
continue
has_block = True
mean_bytes = sum(b.size for b in blocks) / float(nblocks)
ss += "\n *bin {}: nblocks={} mean_block_size={}".format(
bin_nr, nblocks, bytes2str(mean_bytes)
)
if not has_block:
ss += "\n *no held blocks, all blocks are in use*"
ss += "\n=========================================="
return ss
[docs]
def print_allocation_report(self):
"""
Print various statistics of this pool.
"""
print()
print(self.allocation_report())
print()
[docs]
class PoolAllocationStatistics:
def __init__(self):
# counters
self.nrequests = 0
self.nallocs = 0
self.nfrees = 0
self.nreturns = 0
self.nreuses = 0
# bytes
self.ballocs = 0
self.bfrees = 0
self.breuses = 0
self.breturns = 0
# profiling
self.tallocs = 0
self.tfrees = 0
[docs]
def push_alloc(self, size, t):
self.nrequests += 1
self.nallocs += 1
self.ballocs += size
self.tallocs += t
[docs]
def push_reuse(self, size):
assert self.nallocs > 0
self.nrequests += 1
self.nreuses += 1
self.breuses += size
[docs]
def push_return(self, size):
assert self.nallocs > 0
self.nreturns += 1
self.breturns += size
[docs]
def push_free(self, size, t):
assert self.nallocs > 0
self.nfrees += 1
self.bfrees += size
self.tfrees += t
[docs]
def reuse_factor(self):
if self.nallocs > 0:
return float(self.nreuses) / self.nrequests
else:
return 0
[docs]
def mean_alloc_time(self):
if self.nallocs > 0:
return self.tallocs / self.nallocs
else:
return 0
[docs]
def mean_free_time(self):
if self.nfrees > 0:
return self.tfrees / self.nfrees
else:
return 0
def __str__(self):
ss = "{:>4} requests | {:>4} reuse | {} allocs | {} frees".format(
self.nrequests,
f"{int(self.reuse_factor()*1000)/10.0}%" if self.nreuses else " no",
time2str(self.mean_alloc_time(), on_zero=" no"),
time2str(self.mean_free_time(), on_zero=" no"),
)
return ss